{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Pyspark notebook (python3)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Example commands for running on Google Cloud Dataproc - https://cloud.google.com/dataproc/\n",
    "\n",
    "```\n",
    "gcloud dataproc clusters create cchio-persistent-1 \\\n",
    "    --metadata \"JUPYTER_PORT=8123,JUPYTER_CONDA_PACKAGES=numpy:pandas:scipy:scikit-learn\" \\\n",
    "    --initialization-actions \\\n",
    "        gs://dataproc-initialization-actions/jupyter/jupyter.sh \\\n",
    "    --zone us-central1-c \\\n",
    "    --num-workers 4 \\\n",
    "    --properties spark:spark.executorEnv.PYTHONHASHSEED=0,spark:spark.yarn.am.memory=1024m \\\n",
    "    --worker-machine-type=n1-standard-8 \\\n",
    "    --master-machine-type=n1-standard-8\n",
    "    \n",
    "gcloud compute ssh --zone=us-central1-c \\\n",
    "  --ssh-flag=\"-D\" --ssh-flag=\"10000\" --ssh-flag=\"-N\" --ssh-flag=\"-n\" \"cchio-persistent-1-m\" &\n",
    "  \n",
    "/Applications/Google\\ Chrome.app/Contents/MacOS/Google\\ Chrome \\\n",
    "    \"http://cchio-persistent-1-m:8123\" \\\n",
    "    --proxy-server=\"socks5://localhost:10000\" \\\n",
    "    --host-resolver-rules=\"MAP * 0.0.0.0 , EXCLUDE localhost\" \\\n",
    "    --user-data-dir=/tmp/\n",
    "```\n",
    "\n",
    "Defaults to python3 - if you want to use python2, add `--metadata MINICONDA_VARIANT=2`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import os\n",
    "import sys\n",
    "sys.path.append('../chapter1')\n",
    "import email_read_util\n",
    "\n",
    "from pyspark.sql.types import *\n",
    "from pyspark.ml import Pipeline\n",
    "from pyspark.ml.feature import Tokenizer, CountVectorizer\n",
    "from pyspark.ml.classification import RandomForestClassifier\n",
    "from pyspark.ml.evaluation import BinaryClassificationEvaluator"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Download 2007 TREC Public Spam Corpus\n",
    "1. Read the \"Agreement for use\"\n",
    "   https://plg.uwaterloo.ca/~gvcormac/treccorpus07/\n",
    "\n",
    "2. Download 255 MB Corpus (trec07p.tgz) and untar into the 'chapter1/datasets' directory\n",
    "\n",
    "3. Check that the below paths for 'data_dir' and 'labels_path' exist"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "DATA_DIR = '../chapter1/datasets/trec07p/data/'\n",
    "LABELS_FILE = '../chapter1/datasets/trec07p/full/index'\n",
    "TRAINING_SET_RATIO = 0.7"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "labels = {}\n",
    "# Read the labels.\n",
    "with open(LABELS_FILE) as f:\n",
    "    for line in f:\n",
    "        line = line.strip()\n",
    "        label, key = line.split()\n",
    "        labels[key.split('/')[-1]] = 1 if label.lower() == 'ham' else 0"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def read_email_files():\n",
    "    X = []\n",
    "    y = [] \n",
    "    for i in range(len(labels)):\n",
    "        filename = 'inmail.' + str(i+1)\n",
    "        email_str = email_read_util.extract_email_text(\n",
    "            os.path.join(DATA_DIR, filename))\n",
    "        X.append(email_str)\n",
    "        y.append(labels[filename])\n",
    "    return X, y"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "X, y = read_email_files()\n",
    "\n",
    "schema = StructType([\n",
    "            StructField('id', IntegerType(), nullable=False),\n",
    "            StructField('email', StringType(), nullable=False),\n",
    "            StructField('label', DoubleType(), nullable=False)])\n",
    "\n",
    "df = spark.createDataFrame(zip(range(len(y)), X, y), schema)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "root\n",
    " |-- id: integer (nullable = false)\n",
    " |-- email: string (nullable = false)\n",
    " |-- label: double (nullable = false)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "train, test = df.randomSplit([TRAINING_SET_RATIO, 1-TRAINING_SET_RATIO], seed=123)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "tokenizer = Tokenizer()\n",
    "vectorizer = CountVectorizer()\n",
    "rfc = RandomForestClassifier()\n",
    "\n",
    "pipeline = Pipeline(stages=[tokenizer, vectorizer, rfc])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "paramMap = {\n",
    "    tokenizer.inputCol: 'email',\n",
    "    tokenizer.outputCol: 'tokens',\n",
    "\n",
    "    vectorizer.inputCol: 'tokens',\n",
    "    vectorizer.outputCol: 'vectors',\n",
    "\n",
    "    rfc.featuresCol: 'vectors',\n",
    "    rfc.labelCol: 'label',\n",
    "    rfc.numTrees: 500\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "model = pipeline.fit(train, params=paramMap)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "prediction = model.transform(test)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')\n",
    "pr_score = evaluator.evaluate(prediction, {evaluator.metricName: 'areaUnderPR'})\n",
    "roc_score = evaluator.evaluate(prediction, {evaluator.metricName: 'areaUnderROC'})\n",
    "\n",
    "print(\"Area under ROC curve score: {:.3f}\".format(roc_score))\n",
    "print(\"Area under precision/recall curve score: {:.3f}\".format(pr_score))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Area under ROC curve score: 0.971\n",
    "Area under precision/recall curve score: 0.958\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  },
  "widgets": {
   "state": {},
   "version": "1.0.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
